bitkeeper revision 1.1327.2.3 (4267b4918M714ImdecocSvKqAkVj1A)
authormjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Thu, 21 Apr 2005 14:11:29 +0000 (14:11 +0000)
committermjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Thu, 21 Apr 2005 14:11:29 +0000 (14:11 +0000)
Add some locking to console handling.
Remove a dead file.

Signed-off-by: Mike Wray <mike.wray@hp.com>
.rootkeys
tools/python/xen/xend/EventTypes.py [deleted file]
tools/python/xen/xend/server/console.py

index 1d950d569585c34b877412f711036a2453fbe4ba..410a16c677cdfe8a018da0df3cb7c21a3d5c8380 100644 (file)
--- a/.rootkeys
+++ b/.rootkeys
 40c9c468SNuObE_YWARyS0hzTPSzKg tools/python/xen/xend/Args.py
 41597996WNvJA-DVCBmc0xU9w_XmoA tools/python/xen/xend/Blkctl.py
 40c9c468Um_qc66OQeLEceIz1pgD5g tools/python/xen/xend/EventServer.py
-40c9c468U8EVl0d3G--8YXVg6VJD3g tools/python/xen/xend/EventTypes.py
 40c9c468QJTEuk9g4qHxGpmIi70PEQ tools/python/xen/xend/PrettyPrint.py
 40e15b7eeQxWE_hUPB2YTgM9fsZ1PQ tools/python/xen/xend/Vifctl.py
 4151594bBq8h-bwTfEt8dbBuojMtcA tools/python/xen/xend/XendAsynchProtocol.py
diff --git a/tools/python/xen/xend/EventTypes.py b/tools/python/xen/xend/EventTypes.py
deleted file mode 100644 (file)
index 6350baa..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-#   Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-
-## XEND_DOMAIN_CREATE = "xend.domain.create": dom
-## create: 
-## xend.domain.destroy: dom, reason:died/crashed
-## xend.domain.up ?
-
-## xend.domain.unpause: dom
-## xend.domain.pause: dom
-## xend.domain.shutdown: dom
-## xend.domain.destroy: dom
-
-## xend.domain.migrate.begin: dom, to
-## Begin tells: src host, src domain uri, dst host. Dst id known?
-## err: src host, src domain uri, dst host, dst id if known, status (of domain: ok, dead,...), reason
-## end: src host, src domain uri, dst host, dst uri
-
-## Events for both ends of migrate: for exporter and importer?
-## Include migrate id so can tie together.
-## Have uri /xend/migrate/<id> for migrate info (migrations in progress).
-
-## (xend.domain.migrate.begin (src <host>) (src.domain <id>)
-##                            (dst <host>) (id <migrate id>))
-## xend.domain.migrate.end:
-## (xend.domain.migrate.end (domain <id>) (to <host>)
-
-## xend.node.up:  xend uri
-## xend.node.down: xend uri
-
-## xend.error ?
-
-## format:
-
index fbf0ff9bb91c822581c98c2313ad83cf17633ed3..339366fa6e36cc1283bf9d224dc535d8f05322a7 100755 (executable)
@@ -1,6 +1,7 @@
 # Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
 
 import socket
+import threading
 
 from xen.web import reactor, protocol
 
@@ -86,6 +87,7 @@ class ConsoleDev(Dev):
 
     def __init__(self, controller, id, config, recreate=False):
         Dev.__init__(self, controller, id, config)
+        self.lock = threading.RLock()
         self.status = self.STATUS_NEW
         self.addr = None
         self.conn = None
@@ -107,9 +109,13 @@ class ConsoleDev(Dev):
                        [self.id, self.getDomain(), self.console_port])
 
     def init(self, recreate=False, reboot=False):
-        self.destroyed = False
-        self.channel = self.getChannel()
-        self.listen()
+        try:
+            self.lock.acquire()
+            self.destroyed = False
+            self.channel = self.getChannel()
+            self.listen()
+        finally:
+            self.lock.release()
 
     def checkConsolePort(self, console_port):
         """Check that a console port is not in use by another console.
@@ -121,29 +127,41 @@ class ConsoleDev(Dev):
             ctrl.checkConsolePort(console_port)
     
     def sxpr(self):
-        val = ['console',
-               ['status', self.status ],
-               ['id',     self.id    ],
-               ['domain', self.getDomain() ] ]
-        val.append(['local_port',   self.getLocalPort()  ])
-        val.append(['remote_port',  self.getRemotePort() ])
-        val.append(['console_port', self.console_port    ])
-        val.append(['index', self.getIndex()])
-        if self.addr:
-            val.append(['connected', self.addr[0], self.addr[1]])
+        try:
+            self.lock.acquire()
+            val = ['console',
+                   ['status', self.status ],
+                   ['id',     self.id    ],
+                   ['domain', self.getDomain() ] ]
+            val.append(['local_port',   self.getLocalPort()  ])
+            val.append(['remote_port',  self.getRemotePort() ])
+            val.append(['console_port', self.console_port    ])
+            val.append(['index', self.getIndex()])
+            if self.addr:
+                val.append(['connected', self.addr[0], self.addr[1]])
+        finally:
+            self.lock.release()
         return val
 
     def getLocalPort(self):
-        if self.channel:
-            return self.channel.getLocalPort()
-        else:
-            return 0
+        try:
+            self.lock.acquire()
+            if self.channel:
+                return self.channel.getLocalPort()
+            else:
+                return 0
+        finally:
+            self.lock.release()
 
     def getRemotePort(self):
-        if self.channel:
-            return self.channel.getRemotePort()
-        else:
-            return 0
+        try:
+            self.lock.acquire()
+            if self.channel:
+                return self.channel.getRemotePort()
+            else:
+                return 0
+        finally:
+            self.lock.release()
 
     def uri(self):
         """Get the uri to use to connect to the console.
@@ -166,23 +184,31 @@ class ConsoleDev(Dev):
         print 'ConsoleDev>destroy>', self, reboot
         if reboot:
             return
-        self.status = self.STATUS_CLOSED
-        if self.conn:
-            self.conn.loseConnection()
-        self.listener.stopListening()
+        try:
+            self.lock.acquire()
+            self.status = self.STATUS_CLOSED
+            if self.conn:
+                self.conn.loseConnection()
+            self.listener.stopListening()
+        finally:
+            self.lock.release()
 
     def listen(self):
         """Listen for TCP connections to the console port..
         """
-        if self.closed():
-            return
-        if self.listener:
-            pass
-        else:
-            self.status = self.STATUS_LISTENING
-            cf = ConsoleFactory(self, self.id)
-            interface = xroot.get_console_address()
-            self.listener = reactor.listenTCP(self.console_port, cf, interface=interface)
+        try:
+            self.lock.acquire()
+            if self.closed():
+                return
+            if self.listener:
+                pass
+            else:
+                self.status = self.STATUS_LISTENING
+                cf = ConsoleFactory(self, self.id)
+                interface = xroot.get_console_address()
+                self.listener = reactor.listenTCP(self.console_port, cf, interface=interface)
+        finally:
+            self.lock.release()
 
     def connect(self, addr, conn):
         """Connect a TCP connection to the console.
@@ -193,27 +219,35 @@ class ConsoleDev(Dev):
 
         returns 0 if ok, negative otherwise
         """
-        if self.closed():
-            return -1
-        if self.connected():
-            return -1
-        self.addr = addr
-        self.conn = conn
-        self.status = self.STATUS_CONNECTED
-        self.writeOutput()
+        try:
+            self.lock.acquire()
+            if self.closed():
+                return -1
+            if self.connected():
+                return -1
+            self.addr = addr
+            self.conn = conn
+            self.status = self.STATUS_CONNECTED
+            self.writeOutput()
+        finally:
+            self.lock.release()
         return 0
 
     def disconnect(self, conn=None):
         """Disconnect the TCP connection to the console.
         """
         print 'ConsoleDev>disconnect>', conn
-        if conn and conn != self.conn: return
-        if self.conn:
-            self.conn.loseConnection()
-        self.addr = None
-        self.conn = None
-        self.status = self.STATUS_LISTENING
-        self.listen()
+        try:
+            self.lock.acquire()
+            if conn and conn != self.conn: return
+            if self.conn:
+                self.conn.loseConnection()
+            self.addr = None
+            self.conn = None
+            self.status = self.STATUS_LISTENING
+            self.listen()
+        finally:
+            self.lock.release()
 
     def receiveOutput(self, msg):
         """Receive output console data from the console channel.
@@ -223,30 +257,38 @@ class ConsoleDev(Dev):
         subtype minor message typ
         """
         # Treat the obuf as a ring buffer.
-        data = msg.get_payload()
-        data_n = len(data)
-        if self.obuf.space() < data_n:
-            self.obuf.discard(data_n)
-        if self.obuf.space() < data_n:
-            data = data[-self.obuf.space():]
-        self.obuf.write(data)
-        self.writeOutput()
+        try:
+            self.lock.acquire()
+            data = msg.get_payload()
+            data_n = len(data)
+            if self.obuf.space() < data_n:
+                self.obuf.discard(data_n)
+            if self.obuf.space() < data_n:
+                data = data[-self.obuf.space():]
+            self.obuf.write(data)
+            self.writeOutput()
+        finally:
+            self.lock.release()
         
     def writeOutput(self):
         """Handle buffered output from the console device.
         Sends it to the connected TCP connection (if any).
         """
-        if self.closed():
-            return -1
-        if not self.conn:
-            return 0
-        while not self.obuf.empty():
-            try:
-                bytes = self.conn.write(self.obuf.peek())
-                if bytes > 0:
-                    self.obuf.discard(bytes)
-            except socket.error:
-                pass
+        try:
+            self.lock.acquire()
+            if self.closed():
+                return -1
+            if not self.conn:
+                return 0
+            while not self.obuf.empty():
+                try:
+                    bytes = self.conn.write(self.obuf.peek())
+                    if bytes > 0:
+                        self.obuf.discard(bytes)
+                except socket.error:
+                    pass
+        finally:
+            self.lock.release()
         return 0
     
     def receiveInput(self, conn, data):
@@ -257,20 +299,28 @@ class ConsoleDev(Dev):
         conn connection
         data input data
         """
-        if self.closed(): return -1
-        if conn != self.conn: return 0
-        self.ibuf.write(data)
-        self.writeInput()
+        try:
+            self.lock.acquire()
+            if self.closed(): return -1
+            if conn != self.conn: return 0
+            self.ibuf.write(data)
+            self.writeInput()
+        finally:
+            self.lock.release()
         return 0
 
     def writeInput(self):
         """Write pending console input to the console channel.
         Writes as much to the channel as it can.
         """
-        while self.channel and not self.ibuf.empty():
-            msg = xu.message(CMSG_CONSOLE, 0, 0)
-            msg.append_payload(self.ibuf.read(msg.MAX_PAYLOAD))
-            self.channel.writeRequest(msg)
+        try:
+            self.lock.acquire()
+            while self.channel and not self.ibuf.empty():
+                msg = xu.message(CMSG_CONSOLE, 0, 0)
+                msg.append_payload(self.ibuf.read(msg.MAX_PAYLOAD))
+                self.channel.writeRequest(msg)
+        finally:
+            self.lock.release()
 
 class ConsoleController(DevController):
     """Device controller for all the consoles for a domain.